home *** CD-ROM | disk | FTP | other *** search
- # Source Generated with Decompyle++
- # File: in.pyc (Python 2.6)
-
- __author__ = 'Robert Ancell <bob27@users.sourceforge.net>'
- __license__ = 'GNU General Public License Version 2'
- __copyright__ = 'Copyright 2005-2006 Robert Ancell'
- import os
- import sys
- import select
- import signal
- import xml.dom.minidom as xml
- import xml.parsers.expat as xml
- import game
- import cecp
- import uci
- from defaults import *
- CECP = 'CECP'
- UCI = 'UCI'
-
- class Option:
- '''
- '''
- value = ''
-
-
- class Level:
- '''
- '''
-
- def __init__(self):
- self.options = []
-
-
-
- class Profile:
- '''
- '''
-
- def __init__(self):
- self.name = ''
- self.protocol = ''
- self.path = ''
- self.executables = []
- self.arguments = []
- self.profiles = { }
-
-
- def detect(self):
- '''
- '''
-
- try:
- path = os.environ['PATH'].split(os.pathsep)
- except KeyError:
- path = []
-
- for directory in path:
- for executable in self.executables:
- b = directory + os.sep + executable
- if os.path.isfile(b):
- self.path = b
- return None
-
-
- self.path = None
-
-
-
- def _getXMLText(node):
- '''
- '''
- if len(node.childNodes) == 0:
- return ''
- if len(node.childNodes) > 1 or node.childNodes[0].nodeType != node.TEXT_NODE:
- raise ValueError
- node.childNodes[0].nodeType != node.TEXT_NODE
- return node.childNodes[0].nodeValue
-
-
- def _loadLevel(node):
- '''
- '''
- level = Level()
- n = node.getElementsByTagName('name')
- if len(n) != 1:
- return None
- level.name = _getXMLText(n[0])
- for e in node.getElementsByTagName('option'):
- option = Option()
- option.value = _getXMLText(e)
-
- try:
- attribute = e.attributes['name']
- except KeyError:
- len(n) != 1
- len(n) != 1
- except:
- len(n) != 1
-
- option.name = _getXMLText(attribute)
- level.options.append(option)
-
- return level
-
-
- def loadProfiles():
- '''
- '''
- profiles = []
- fileNames = [
- os.path.expanduser(LOCAL_AI_CONFIG),
- os.path.join(BASE_DIR, 'ai.xml'),
- 'ai.xml']
- document = None
- for f in fileNames:
-
- try:
- document = xml.dom.minidom.parse(f)
- except IOError:
- continue
- except xml.parsers.expat.ExpatError:
- print 'AI configuration from %s is invalid, ignoring' % f
- continue
- else:
- break
- if document is None:
- print 'WARNING: No AI configuration'
- return profiles
- elements = document.getElementsByTagName('aiconfig')
- if len(elements) == 0:
- return profiles
- for e in elements:
- for p in e.getElementsByTagName('ai'):
-
- try:
- protocolName = p.attributes['type'].nodeValue
- except KeyError:
- len(elements) == 0
- len(elements) == 0
- document is None
- if not False:
- raise AssertionError
- except:
- False
-
- if protocolName == 'cecp':
- protocol = CECP
- elif protocolName == 'uci':
- protocol = UCI
- elif not False:
- raise AssertionError, 'Uknown AI type: %s' % repr(protocolName)
- n = p.getElementsByTagName('name')
- if not len(n) > 0:
- raise AssertionError
- name = _getXMLText(n[0])
- executables = []
- n = p.getElementsByTagName('binary')
- if not len(n) > 0:
- raise AssertionError
- for x in n:
- executables.append(_getXMLText(x))
-
- arguments = []
- for x in p.getElementsByTagName('argument'):
- arguments.append(_getXMLText(x))
-
- levels = { }
- for x in p.getElementsByTagName('level'):
- level = _loadLevel(x)
- if level is not None:
- levels[level.name] = level
- continue
- len(n) > 0
-
- profile = Profile()
- profile.name = name
- profile.protocol = protocol
- profile.executables = executables
- profile.arguments = arguments
- profile.levels = levels
- profiles.append(profile)
-
-
-
- return profiles
-
-
- class CECPConnection(cecp.Connection):
- '''
- '''
-
- def __init__(self, player):
- '''
- '''
- self.player = player
- cecp.Connection.__init__(self)
-
-
- def onOutgoingData(self, data):
- '''Called by cecp.Connection'''
- self.player.logText(data, 'output')
- self.player.sendToEngine(data)
-
-
- def onMove(self, move):
- '''Called by cecp.Connection'''
- if self.player.isReadyToMove():
- self.player.moving = True
- self.player.move(move)
- elif not self.player.suppliedMove is None:
- raise AssertionError
- self.player.suppliedMove = move
-
-
- def logText(self, text, style):
- '''Called by cecp.Connection'''
- self.player.logText(text, style)
-
-
-
- class UCIConnection(uci.StateMachine):
- '''
- '''
-
- def __init__(self, player):
- '''
- '''
- self.player = player
- uci.StateMachine.__init__(self)
-
-
- def onOutgoingData(self, data):
- '''Called by uci.StateMachine'''
- self.player.logText(data, 'output')
- self.player.sendToEngine(data)
-
-
- def logText(self, text, style):
- '''Called by uci.StateMachine'''
- self.player.logText(text, style)
-
-
- def onMove(self, move):
- '''Called by uci.StateMachine'''
- self.player.move(move)
-
-
-
- def _cDied(sig, stackFrame):
-
- try:
- (pid, status) = os.waitpid(-1, os.WNOHANG)
- except OSError:
- pass
-
-
- signal.signal(signal.SIGCHLD, _cDied)
-
- class Player(game.ChessPlayer):
- '''
- '''
-
- def __init__(self, name, profile, level = 'normal'):
- """Constructor for an AI player.
-
- 'name' is the name of the player (string).
- 'profile' is the profile to use for the AI (Profile).
- 'level' is the difficulty level to use (string).
- """
- self._Player__profile = profile
- self._Player__level = level
- self.moving = False
- self.suppliedMove = None
- game.ChessPlayer.__init__(self, name)
- (toManagerOutput, toManagerInput) = os.pipe()
- (fromManagerOutput, fromManagerInput) = os.pipe()
- self._Player__toEngineFd = toManagerInput
- self._Player__fromEngineFd = fromManagerOutput
-
- try:
- self._Player__pid = os.fork()
- except OSError:
- e = None
- print 'Monitor failed to fork: %s' % e.message
- os.close(toManagerInput)
- os.close(toManagerOutput)
- os.close(fromManagerInput)
- os.close(fromManagerOutput)
- self._Player__toEngineFd = None
- self._Player__fromEngineFd = None
-
- if self._Player__pid == 0:
- os.close(toManagerInput)
- os.close(fromManagerOutput)
- self._runMonitor(fromManagerInput, toManagerOutput)
- os.close(toManagerOutput)
- os.close(fromManagerInput)
- os._exit(0)
-
- os.close(toManagerOutput)
- os.close(fromManagerInput)
- if profile.protocol == CECP:
- self.connection = CECPConnection(self)
- elif profile.protocol == UCI:
- self.connection = UCIConnection(self)
- elif not False:
- raise AssertionError
- self.connection.start()
- self.connection.startGame()
-
- try:
- level = self._Player__profile.levels[self._Player__level]
- except KeyError:
- self.connection.configure()
-
- self.connection.configure(level.options)
-
-
- def logText(self, text, style):
- '''
- '''
- pass
-
-
- def getProfile(self):
- '''Get the AI profile this AI is using.
-
- Returns a 2-tuple containing the profile name (str) and the difficulty level (str).
- '''
- return (self._Player__profile.name, self._Player__level)
-
-
- def fileno(self):
- '''Returns the file descriptor for communicating with the engine (integer)'''
- return self._Player__fromEngineFd
-
-
- def read(self):
- '''Read and process data from the engine.
-
- This is non-blocking.
- '''
- while True:
- if self._Player__fromEngineFd == None:
- return False
- (rlist, _, xlist) = select.select([
- self._Player__fromEngineFd], [], [
- self._Player__fromEngineFd], 0)
- if len(rlist) + len(xlist) == 0:
- return True
-
- try:
- data = os.read(self._Player__fromEngineFd, 256)
- except OSError:
- len(rlist) + len(xlist) == 0
- e = len(rlist) + len(xlist) == 0
- self._Player__fromEngineFd == None
- print 'Error reading from chess engine: ' + str(e)
- self._die()
- return False
-
- if len(data) == 0:
- print 'Engine has died'
- self._die()
- return False
- self.connection.registerIncomingData(data)
- continue
- len(data) == 0
-
-
- def sendToEngine(self, data):
- '''
- '''
- if self._Player__toEngineFd == None:
- return None
-
- try:
- os.write(self._Player__toEngineFd, data)
- except OSError:
- self._Player__toEngineFd == None
- e = self._Player__toEngineFd == None
- print 'Failed to write to engine: ' + str(e)
- except:
- self._Player__toEngineFd == None
-
-
-
- def quit(self):
- '''Disconnect the AI'''
- fd = self._Player__toEngineFd
- self._Player__toEngineFd = None
- self._Player__fromEngineFd = None
-
- try:
- if fd is not None:
- os.write(fd, '\nquit\n')
- except OSError:
- return None
-
-
-
- def onPlayerMoved(self, player, move):
- '''Called by game.ChessPlayer'''
- if self._Player__toEngineFd == None:
- return None
- if player is self:
- pass
- isSelf = self.moving
- self.moving = False
- self.connection.reportMove(move.canMove, isSelf)
-
-
- def onUndoMove(self):
- '''Called by game.ChessPlayer'''
- self.connection.undoMove()
-
-
- def readyToMove(self):
- '''Called by game.ChessPlayer'''
- if self._Player__toEngineFd == None:
- self.die()
- return None
- game = self.getGame()
- whiteTime = game.getWhite().getRemainingTime()
- blackTime = game.getBlack().getRemainingTime()
- if game.getWhite() is self:
- ownTime = whiteTime
- else:
- ownTime = blackTime
- if self.suppliedMove is None:
- self.connection.requestMove(whiteTime, blackTime, ownTime)
- else:
- self.moving = True
- move = self.suppliedMove
- self.suppliedMove = None
- self.move(move)
-
-
- def onGameEnded(self, game):
- '''Called by game.ChessPlayer'''
- self.quit()
-
-
- def _die(self):
- self.quit()
- self.die()
-
-
- def _runEngine(self, toEngineFd, fromEngineFd):
- os.nice(19)
-
- try:
- os.mkdir(LOG_DIR)
- except OSError:
- pass
-
-
- try:
- os.chdir(LOG_DIR)
- except OSError:
- pass
-
- os.dup2(toEngineFd, sys.stdin.fileno())
- os.dup2(fromEngineFd, sys.stdout.fileno())
- os.dup2(fromEngineFd, sys.stderr.fileno())
-
- try:
- os.execv(self._Player__profile.path, [
- self._Player__profile.path] + self._Player__profile.arguments)
- except OSError:
- pass
-
-
-
- def _runMonitor(self, toApplicationFd, fromApplicationFd):
- (toEngineOutput, toEngineInput) = os.pipe()
- (fromEngineOutput, fromEngineInput) = os.pipe()
-
- try:
- enginePID = os.fork()
- except OSError:
- e = None
- print 'Monitor failed to fork: %s' % e.message
- os._exit(1)
-
- if enginePID == 0:
- os.close(toApplicationFd)
- os.close(fromApplicationFd)
- os.close(toEngineInput)
- os.close(fromEngineOutput)
- self._runEngine(toEngineOutput, fromEngineInput)
- os._exit(0)
- else:
- os.close(toEngineOutput)
- os.close(fromEngineInput)
- inputPipes = (fromApplicationFd, fromEngineOutput)
- targets = {
- fromApplicationFd: toEngineInput,
- fromEngineOutput: toApplicationFd }
- pipes = (toApplicationFd, fromApplicationFd, toEngineInput, fromEngineOutput)
-
- try:
- while True:
- (rfds, _, xfds) = select.select(inputPipes, [], pipes, None)
- for fd in rfds:
- data = os.read(fd, 65535)
- if len(data) == 0:
- raise OSError('End of data')
- len(data) == 0
- os.write(targets[fd], data)
-
- except:
-
- try:
- os.kill(enginePID, signal.SIGQUIT)
- except OSError:
- pass
-
- os._exit(0)
-
-
-
-